Использование AMQP на примере RabbitMQ

Использование AMQP на примере RabbitMQ

Содержание

5 минут
Артём
Лисовский
Head of learning


Я Артём Лисовский, head of learning в IT-компании kt.team. Статья составлена на базе выступления для команды kt.team и может быть полезна и интересна всем разработчикам, которые пишут сервисы с высокими требованиями к отказоустойчивости и масштабируемости.

Сегодня познакомимся с RabbitMQ — программным брокером сообщений на основе стандарта AMQP. Чтобы снизить порог вхождения в тему, я буду объяснять работу с брокером и очередями на наглядном примере. Допустим, мы управляем небольшим заведением общественного питания (пусть это будет ларёк с шаурмой).

Что такое очереди сообщений и зачем они нужны. Варианты реализации очереди сообщений. Брокер очередей

Работа ларька шаурмы очень похожа на работу web-сайта, потому что и там и там посетители выполняют похожие действия:

1 предоставляют данные, которые нужно обработать;

2 получают какие-то данные в ответ.

Слайд № 1

Слайд 1

В обычной жизни аналог GET-запросов — это меню заведения и всё то, что мы видим, внешняя оболочка.

Слайд № 2

А POST-запросы состоят из желания сделать заказ, просьбы дать книгу жалоб и так далее; всё, что мы можем запросить у продавца.

В чём минус обычного меню? Когда покупатель подходит к ларьку, у него нет обратной связи по складу, он не знает, сколько и каких лавашей есть в наличии. Поэтому ему приходится выяснять, есть ли шаурма в толстом лаваше или, наоборот, в тонком. Продавец иногда будет отвечать «да», иногда — «нет», и было бы неплохо всё это автоматизировать, чтобы избавиться от лишних операций.

Слайд № 3

Для такой автоматизации есть множество систем, но сегодня мы поговорим не о них.

Слайд № 4

Какой самый простой способ оптимизировать большое количество заказов, например во время бизнес-ланча? Торговцы шаурмой уже вовсю его используют. Нужно обеспечить покупателям возможность заказа по звонку (заказать и забрать через 15 минут). Минус в том, что нам всё ещё нужно обрабатывать эту очередь, и телефонная линия может не справиться с наплывом звонков. Тогда клиенты останутся без заказов, и мы потеряем в выручке.

Слайд № 5

Другой способ: пригласить друзей и начать готовить шаурму вместе, но тогда у нас возникают новые проблемы. Всё потому, что очередью никто не управляет.

Слайд № 6

Когда к нам поступает много параллельных заказов и мы не знаем, кто за какой заказ будет отвечать, нам нужен менеджер, который эту очередь заказов распределит по исполнителям.

Если этого не сделать, у нас появляются следующие риски:

  • потерять заказ;
  • взяться за один заказ нескольким людям одновременно или, наоборот, не взяться вообще;
  • заказчик уйдёт, а мы продолжим делать шаурму, за которую никто не заплатит;
  • если один и тот же исполнитель и готовит, и принимает оплату, ему придётся слишком часто снимать перчатки, из-за чего будет тратиться слишком много времени, и так далее.


В общем, возникает небольшой хаос, как в типичной модели управления проектами waterfall.

Как можно решить эту проблему?

  • Первый способ
  • Самое простое — поставить над всеми исполнителями старшего официанта, менеджера, кипера, который будет распределять заказы. Если так сделать, то до поры до времени всё будет идти вполне неплохо.

    В таких целях на маленьких объектах часто делают MySQL-табличку, в которую записывают очередь. Напротив каждой записи проставляют статус «выполнено» / «не выполнено» и далее берут данные для работы из этой таблички. Наиболее явные минусы этого способа:

1 очевидно, что мы не блокируем содержимое, т. е. несколько работников (исполнителей) могут взяться за один и тот же заказ;

2 если будет много заказов, а MySQL лежит у нас на том же сервере, что и база (а в основном так и бывает), мы можем просто «положить» нашу базу — и всё пойдёт плохо.

Слайд № 7

  • Второй способ
  • Использовать брокер очередей, который будет управлять нашей очередью. Мы можем задать брокеру определённую логику распределения этой очереди, чтобы все сообщения были гарантированно получены и обработаны нужным образом.

Слайд № 8

Посмотрим, что предлагается в коробке большинства брокеров (и в RabbitMQ тоже), на примере решаемой задачи.

1 Заказы (и не только) гарантированно записываются в очередь.

2 Заказы распределяются между поварами.

3 Если повар, скажем, отошёл или сломал руку, заказ остаётся в очереди. Этот повар может взять заказ позже, или его может взять другой повар.

4 Заказ блокируется (чтобы один и тот же заказ не взяли два повара одновременно).

5 Есть отдельные очереди на пиццу/роллы.

Реализация брокера очередей в виде RabbitMQ

Плавно переходим к реализации брокера очередей в виде RabbitMQ. Это гибридный брокер, он поддерживает несколько протоколов.

Сегодня наиболее распространённый протокол — это AMQP. О нём по большей части и будем вести речь.

Есть и другие протоколы, например MQTT, когда на TCP/IP повешена возможность подписки и получения, исполнения сообщений.

Слайд № 9

В AMQP есть несколько дефолтных сущностей, с которыми и происходит работа:

1 producer — отправитель сообщения;

2 message — само сообщение;

3 exchange — пункт роутинга сообщений (здесь мы можем указать, куда какое сообщение должно пойти);

4 queue — сама очередь из сообщений;

5 consumer — исполнитель, который из этой очереди что-то заберёт и что-то сделает.

Если смотреть принципиально (сверху), то AMQP-протокол вообще и RabbitMQ в частности представляют собой следующую модель.

Слайд № 10

Producer публикует сообщения в брокер, брокер хранит это сообщение в очереди (и понимает, в какой из очередей его надо хранить), затем доставляет его исполнителю. Либо исполнитель подписывается на сообщения и, как только они появляются, пытается их исполнить. Брокер также может возвращать сообщения в очередь, чтобы они оставались там при необходимости.

Самый банальный пример такой модели — создание PDF.

Слайд № 11

Почему создание PDF и подобных задач имеет смысл закинуть в очередь? Обычно они занимают много времени, и тратить его, пока наш пользователь находится между моментом совершения заказа и моментом отправки файла, не хочется. В большинстве случаев разработчики пишут так: «Спасибо за ваш запрос, мы создадим PDF и отправим его вам на почту». Если проект небольшой, то логика при этом обычно происходит в MySQL-таблицах. Но как только встаёт вопрос масштабирования, MySQL даёт сбой — возможны блокировки, отсутствие управления очередью, медленная (по сравнению с memory-based) скорость обработки. Реляционная база данных хороша, но лучше хранить сообщения в хранилище побыстрее и более предназначенном для этого. Мы можем записать этот запрос в очередь и уже из очереди выполнять его и отправлять пользователю.

Сообщение — это не интерпретируемая брокером единица сущности. Это может быть как просто строка, так и объект в виде JSON или любая структура, представленная в виде строки.

Неважно, что мы туда запишем; мы можем указать тип сообщения (например application/JSON), и тогда при обработке будет легко узнать, что закладывалось в начале. Сообщение не интерпретируется, оно просто хранится.

Затем оно попадает в exchange, в точку роутинга. Задача точки роутинга — определить, в какую из очередей должно попасть сообщение.

Вместе с сообщением мы можем прислать какой-то ключ. По ключу мы можем понять, в какую из очередей (одну или несколько) должно попасть сообщение.

Соответственно, есть несколько видов точки роутинга, видов работы:

1 простейший вариант — fanout, когда сообщение попадает во все доступные очереди;

2 direct — полное совпадение ключа. Мы можем создать очередь для одного ключа, очередь для другого; у очереди может быть несколько ключей — всё как в обычном STP-роутинге;

3 topic — ключ удовлетворяет маске (#*), где * — любое непустое слово (одно), а # — то же самое, что и *, только здесь допустимо и пустое слово;

4 headers — мы имеем возможность указать какой-то тип прямо в message и роутиться по нему (используется, когда нет отдельного ключа).

Слайд № 12

Кроме того, очереди представляют собой обычную модель, в которой нет ничего сложного. Из очереди мы берём сообщение путём подключения к брокеру и запроса из очереди либо путём постоянного подключения и запроса из очереди.

Какие фишки подразумеваются в очереди и уже реализованы в Rabbit'е

Слайд № 13

1. Durable

Durable — возможность сохранения состояния при перезагрузке сервера. Например, сервер «уронили» (или он «упал» сам), а очередь нужно не потерять. Для скорости работы очередь хранится в RAM (random-access memory), поэтому, чтобы обеспечить её персистентное состояние, мы должны дополнительно сохранять очередь куда-то на диск. Тут логика работы аналогична любому RAM storage типа Redis и т. п. Мы просто делаем дамп, храним его на диске, в случае «падения» поднимаем дамп с диска — и всё возвращается на круги своя (и никакие сообщения мы не потеряли, кроме периода даунтайма).

Есть несколько режимов создания дампов.

Например, в версии 3.9 есть lazyload, который делает полный дамп при каждом запросе, но он значительно сажает производительность. Можно поступить проще: «подрубиться» к RabbitMQ через Redis и использовать хранилище Redis как основное.

Кроме того, нужно логировать, создавать отдельную очередь, которая бы писала всеми любимую реляционную базу данных. Best practice для high-load-проектов, когда мы держим только кеш в состоянии памяти (все новые заказы например). Одно сообщение можно отправить в несколько очередей, и одна из очередей будет использоваться для связи с реляционной базой данных и для постоянного хранения в ней — так же, как у тебя, наверно, сделано и везде с Elastic (или MySQL, или PostgreSQL, или чем-нибудь другим, что у тебя используется на backend'е). В случае с Elastic он используется как кеш и связывается с постоянным хранилищем. В случае «смерти» Elastic'а всё будет нормально, мы не потеряем данные, а вот в случае «смерти» базы данных нормально уже не будет.

2. TTL & expiration

В time to live мы можем указать каждому сообщению или всем сообщениям в очереди (очереди целиком), сколько они будут жить. Например, нам поступает заказ. Мы понимаем, что через 60 минут он уже станет неактуальным (на примере биржи — через час уже значительно изменятся ставки; на примере шаурмы — ни один клиент не будет ждать шаурму час), и мы можем задать TTL, чтобы каждое сообщение, которое пролежит больше 60 минут, исчезало из очереди.

3. ACK — acknowledge

Сообщения могут быть нескольких видов, и самые популярные из них два:

1) сообщение, которое нужно сразу удалить из очереди, как только оно было взято, не дожидаясь ответа от того, кто его забрал (это и есть ACK);

2) сообщение, которое не нужно удалять из очереди, пока не получен нужный ответ. Это сообщение мы помечаем и блокируем, чтобы другие не взяли его. Если нам не вернётся ответ, оно останется у нас, и другие исполнители смогут взять его.

Кейс: актуальные варианты использования ACK.

Например, сайт связан с каким-то сторонним сервисом, есть очередь из оплат или трек-номеров посылок. Мы берём одно сообщение, отправляем его в сторонний сервис, а он оказывается (внезапно) под высокой нагрузкой и недоступен. Если бы у нас не было блокировки, это сообщение исчезло бы из очереди, и мы бы потеряли клиента. Чтобы этого избежать, можно вернуть ответ «не получилось» или разделить таймаут по времени таким образом, чтобы, если ответ от поставщика данных задержался более чем на 15 секунд, сообщение возвращалось в очередь.

4. Dead lettering

Dead lettering — когда наше сообщение вернулось с ошибкой (блек-джеком) в очередь и нам нужно обработать его не сразу, а с задержкой 5–10 секунд. Такое часто бывает, когда провайдер данных не доступен (т. е. провайдер возвращает нам ошибку, скажем, 404, 504 или что-то подобное). В этом нет ничего страшного, мы возвращаем сообщение в очередь и при возврате можем указать, через какое время этот элемент будет доступен для повторного просмотра.

В общем и целом это работает примерно таким образом.

Слайд № 14

У нас есть exchanges разных типов. Ниже представлены примеры, в каких случаях они могут быть использованы.

Слайд № 15

Допустим, у нас есть разные routing keys, которые состоят из нескольких ключей, и тогда нам будут полезны разные вариации exchange. Если у нас указаны несколько ключей, например europe.weather, то это сообщение нужно поместить и в europe, и в weather. Если usa.weather — и в usa, и в weather. Таким образом, мы можем разроутить все сообщения сразу в несколько очередей.

  • Переходим к реализации

Чтобы начать работать с Rabbit, нам не нужно никаких суперприблуд, нужен банальный composer. Пуллим из Docker образ и запускаем его. Советую пуллить по тегу 3-management. По этому тегу будет доступен крайний стабильный релиз RabbitMQ; приписка management означает, что он будет поставлен вместе с панелью администрирования в виде Web UI (пользовательский интерфейс, представленный в виде сайта, который запускается в web-браузере).

Слайд № 16

Выглядит это примерно так.

Слайд № 17

Это очень удобно, примерно как MySQL сразу с phpMyAdmin. Соответственно, здесь мы можем увидеть сразу все наши очереди (queues), все наши соединения с RabbitMQ (connections), каналы (channels), обменники (exchanges) — в общем, всю информацию подробно.

Слайд № 18

Также здесь мы видим статистику: сколько сообщений было обработано; сколько сообщений не было обработано; какие сообщения не попали ни в одну очередь. В RabbitMQ есть очереди по умолчанию (default), куда попадают все сообщения, которые никому не подошли по роутингу, — мы можем разобраться с ними позже.

В PHP всё подключается довольно просто. При создании соединения мы всегда должны указывать параметры, поэтому держать отдельный instance, который будет задавать параметры конфига в Rabbit, нет смысла. Если такой очереди, или такого обменника, или чего-то ещё не было раньше, мы всегда создаём их заново, а RabbitMQ у нас крутится постоянно, т. е. он работает персистентно. У нас есть название exchange, название очереди, мы создаём коннект на наш хост, порт, используем данные авторизации — и открывается канал для работы по этому соединению.

Чтобы создать новую очередь, есть простейшая функция queue_declare, где мы указываем:

1 название данной очереди;

2 passive: true или false (обычно false; true используется, если нужно обратиться к очереди, не изменяя её состояния, т. е., например, просто проверить, что она существует);

3 durable: true или false (будет ли данный юнит выживать при рестарте сервера, хранится ли он на диске или нет — the queue will survive server restarts);

4 exclusive: true или false (может ли быть доступна очередь из других каналов — мы можем ограничить очередь в рамках одного соединения, но это очень редко используемый кейс, обычно exclusive: false);

5 auto_delete: true или false (очищается ли очередь автоматически, когда соединение закрывается).

Вот и все настройки очереди.

После настройки очереди нам необходимо сделать собственно exchange (точку роутинга) и привязать его к какой-то из очередей (сделать binding).

Слайд № 19

У exchange тоже немного параметров:

  • type (всего четыре типа, о которых мы уже говорили ранее);
  • passive;
  • durable (будем ли хранить юнит при рестарте);
  • auto_delete (очищается ли очередь, когда соединение закрывается).

После того как новый exchange задекларирован, мы можем связать нашу предыдущую очередь с нашим обменником.

  • Пришла пора дополнить картину мира при помощи записи и получения сообщений в нашу очередь.
  • Публикация также довольно проста: мы можем сразу указать текст сообщения, тип контента, который содержит сообщение, и delivery_mode — как быстро сообщение появится в очереди.

Слайд № 20

Вызываем basic_publish — и всё отправляется сразу в очередь.

Слайд № 21

Затем мы можем из очереди что-то получить. Для этого используем basic_get, указываем, из какой очереди получаем, указываем message. В этом message дополнительно хранится (помимо нашего сообщения и данных, которые мы ему задали) информация о том, какой канал был использован и какой используется сейчас, какой использовался routing_key, и в целом всё, что было вокруг нашего сообщения во время его обработки.

Далее используем basic_ack (acknowledge), т. е. указываем базовую систему работы с блокировкой, получаем тестовое сообщение (в нашем случае — body) и можем его посмотреть.

Теперь поговорим про consume — это подключение в очереди с ожиданием, что в ней будут приходить сообщения. В принципе, мы могли бы сделать basic_get, обернуть его в цикл и просто ждать сообщение из очереди. Но в очереди может ничего не оказаться, кроме того, мы будем идти неоптимальным путём. Поэтому и придумали consume — это продолжительное соединение, через которое можно что-то получать. Рассмотрим, как оно указывается.

Слайд № 22

Первый параметр — очередь.

Далее:

  • consumer_tag — это идентификатор consumer'а;
  • nowait — не ждать ответа от сервера (т. е. продолжать получение);
  • callback — функция PHP, которая будет вызвана при получении сообщения. Получается немного асинхронная модель работы. Поскольку мы делаем process_message и указываем callback-функцию, мы должны эту функцию обговорить и обозначить.

Слайд № 23

Она не сильно сложна: нам приходит сообщение, и мы можем с ним что-то сделать. Также мы можем выйти из этого цикла, т. е. приостановить consume изнутри callback'а. Для этого нам необходимо, как указано в примере, проверять на body. Если в body, как в примере, нам пришёл quit, нужно выйти. Чтобы выйти, мы через message возвращаемся к объекту channel, который содержится в свойстве сообщения, и берём basic_cancel с каким-то тегом. Consumer_tag показывает, кто взял это сообщение для обработки. Соответственно, мы выходим — и наш цикл «падает», заканчивается.

Цикл while нам всё равно понадобится, но решение через consume (против решения через постоянный запрос на получение сообщения) более оптимально, т. к. соединение не создаётся для каждого запроса.

Здесь register_shutdown_function — стандартная функция PHP, в которой мы можем обозначить callback-функцию (например shutdown), которая будет выполнена по окончании работы скрипта. Когда скрипт заканчивает работу, нам всегда лучше закрывать соединения. Соответственно, мы регистрируем функцию, которая будет выполняться при завершении скрипта — в нашем примере это функция shutdown с аргументами channel и connection.

Слайд № 24

В итоге этих действий мы добавили к очереди сообщение с текстом, у нас вызывается basic_cancel и закрываются каналы и соединения.

В delivery хранится дополнительная информация о том, как обрабатывалось сообщение.

Слайд № 25

Если нужно использовать много сообщений сразу, лучше задействовать publish_batch (создаёт транзакцию из нескольких сообщений). Это очень просто.

Слайд № 26

Лучшие практики использования RabbitMQ

Best practices

1 Keep your queue short

Стараемся не давать своей очереди «разрастаться». В длине очереди нас ограничивает не только RAM, но и количество сообщений. Rabbit создан не для хранения гигантских очередей — он создан для доставки сообщения от чего-то очень простого (например нашего frontend'а или какой-то отправной точки) к чему-то сложному (типа нашего backend'а, который, возможно, будет обрабатывать сообщения очень долго).

2 Lazy queues (3.9+)

Если мы переживаем за сохранность данных, используем медленные очереди. Хотя я не считаю, что это best practice, т. к. страдает производительность. Мы получаем сохранение на диск, что не очень быстро, но это гарант того, что данные не потеряются никогда.

3 Limit queue size with TTL or max-length

Ограничиваем размер очереди через ограничение длины сообщения или времени его жизни. Тут всё логично: поскольку мы работаем с памятью, нам не надо, чтобы она «текла» и использовалась не по назначению.

Итог

Сегодня отказоустойчивость и готовность сервиса к масштабированию — важные критерии, которые отличают по-настоящему хороший сервис. Достичь этого помогают в том числе и очереди сообщений во главе с брокером сообщений. Понимание кейсов, в которых лучше переложить работу с сообщениями на брокер очередей, очень важно в современной разработке. Даже если наш сервис не будет масштабироваться на много worker'ов и много очередей, сама очередь (и работа с ней через брокер) может быть хорошим буфером для снижения нагрузки и гарантом доставки и обработки сообщений, что позволит нашему сервису продолжить обработку без потерь, даже если во время обработки что-то пойдёт не так.

Оглавление
Другие статьи

Смотреть все

Как уменьшить затраты на поддержание актуальной информации о товарах на маркетплейсах с помощью ESB

29/5/2023

Подробнее

Сервис-ориентированная архитектура. Цифровая копия бизнеса

22/1/2022

Подробнее

Введение в MDM- и PIM-системы на примере Akeneo и Pimcore

7/4/2020

Подробнее

Смотреть все

Мы используем файлы cookie, чтобы предоставить наилучшие возможности сайта

Ок